package hyl.ext.web.ms;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.servlet.http.HttpServletResponse;

import hyl.base.cache2.ExToken;
import hyl.base.net.IpgReader;
import hyl.base.net.nio.NIOReader;
import hyl.base.net.nio.NIOSocketSrv;
import hyl.core.AIni;
import hyl.core.MyFun;
import hyl.core.fun.MyByte;
import hyl.core.io.MyFile;
import hyl.core.io.MyPath;
import hyl.core.net.MyHttp;
import hyl.core.net.pg.Netpg2Msgs1;
import hyl.core.run.IDo1;
import hyl.core.run.MyRun;
import hyl.core.run.MyTimer;
import hyl.core.safe.MyRsa;
import hyl.ext.base.MySession;
import hyl.ext.base.Response;

/**
 * 微服务注册,微服务删除,服务重启,服务停止,服务启动,服务升级,服务查看 MsA 会伴随 单点系统 启动
 * 
 * 
 * TMsB 发送 Netpg2Req1 包
 * 
 * TMsA 反馈 Netpg2Msgs 包
 * 
 * @author 37798955@qq.com
 *
 */
public class TMsA extends TMs {
	// 60s过期=60000
	/**
	 * 缓存 所有已经登记的子系统信息 key: appid value:MsInfo
	 */
	ConcurrentHashMap<String, BInfo> _msmap = null;
	// 在线终端
	ConcurrentHashMap<NIOReader, BInfo> _mschl = null;
	/**
	 * 持久化 子系统接驳信息
	 */
	// CacheBytes _kvms = null;
	/**
	 * 定时持久化 子系统缓存
	 */

	MyTimer _mt = null;
	final int _过期毫秒 = 5000;
	final int _默认端口 = 6220;

	static TMsA _ma;
	static final String cfgfile = "msa";

	public TMsA() {
		super();
	}

	public static TMsA getInstance() {
		if (_ma == null)
			_ma = new TMsA();
		return _ma;
	}

	public void init() {

		load密钥();
		_binfo = new BInfo();
		File file = MyFile.find(cfgfile);

		if (file == null) {
			file = MyFile.openFile(MyPath.getRootPath() + cfgfile);
		}

		_binfo.init(file);
		_binfo.save();
		// 载入库中所有客户端的配置
		// _kvms = new CacheBytes("微服务库", "1");
		loadAppInfos();
//		_kvms.do遍历(null, (k, v) -> {
//			_msmap.put(MyFun.bytes2U8str(k), bytes2InfoMsm(v));
//			return false;
//		});

		int port = _binfo.a_port < 100 ? _默认端口 : _binfo.a_port;
		try {
			启动服务(port);
		} catch (IOException e) {
			e.printStackTrace();
		}
		// 定时保存();
	}

//	void 保存() {
//		if (_msmap.isEmpty())
//			return;
//		long now = MyFun.getMs();
//		_msmap.forEach((k, v) -> {
//			if ((now - v.ptime) > _过期毫秒) {
//				v.state = (byte) 0;
//			}
//			_kvms.put(MyFun.u8str2Bytes(k), infoMsm2bytes(v));
//		});
//	}

//	void 定时保存() {
//		_mt = MyTimer.getInstance("TMs定时", 1);
//		// 每隔10s保存一次
//		_mt.start(() -> {
//			保存();
//		}, 10000);
//	}
	NIOSocketSrv nss = null;

	/**
	 * 接入服务中心
	 * 
	 * @throws IOException
	 */
	void 启动服务(int port) throws IOException {
		// MyFun.print(port);
		nss = new NIOSocketSrv(port);

		IDo1<IpgReader> handle = new IDo1<IpgReader>() {
			@Override
			public void run(IpgReader br) {
				byte[] data;
				NIOReader reader = (NIOReader) br;
				while ((data = br.pullData()) != null) {
					Netpg2Msgs1 npg = _npg.clone();
					npg.ini(data);
					int cmd = npg.getCmd();
					String[] str = npg.getMsgs();
					BInfo bInfo = _mschl.get(br);
					switch (cmd) {
					case I未知:
						// 什么都不做
						break;
					case I注册:
						if (MyFun.isEmpty(str))
							continue;
						String mm = MyRsa.f私钥解密uri(K私钥, str[0]);
						if (MyFun.isEmpty(mm))
							continue;
						String mms[] = mm.split(",");
						String appid = mms[0];
						String key = mms[1];
						// 此时的str 必须是appid,如果未空或者不在集合中 不允许连接
						// 如果 appid 长度不符合标准 退出
						if (MyFun.isEmpty(appid) || appid.length() != 32) {
//						Netpg2Msgs np = _np.clone();
//						np.set(1, "appid非法!!");
//						br.send(np.getSendBytes());
							AIni.Log.error(MyFun.join("appid非法!! 来自", br.get远程连接()));
							// 主动关闭 客户端连接会导致 TIME_WAIT 解决方法请看
							// https://blog.csdn.net/niu91/article/details/115582126
							br.close();
							continue;//
						}
						onConn(appid, key, npg, reader);
						break;
					case IPING: // ping
						onPing(bInfo, npg.getB参数());
						break;
					case IERR: // err
						onErr(str[0], str[1]);
						break;
					case I内部请求: // 请求对方token,响应对方token
						// 检查token 是否存在
						try {
							resB2(br, npg);
						} catch (Exception e) {
						}
						break;
					case I内部应答:// 响应对方token,询问对方有没有token
						resA3(br, npg);
						break;

					case I请求处理:// 收到对方的消息
						if (on收到消息处理 != null)
							MyRun.start用户线程(() -> {
								on收到消息处理.run(npg);
							});
						break;
					case I请求应答:// 收到对方的消息并应答
						if (on收到消息处理并反馈 != null)
							MyRun.start用户线程(() -> {
								Netpg2Msgs1 np1 = on收到消息处理并反馈.ask(npg);
								if (np1 == null)
									return;
								// 1000以内是系统程序范畴
								np1.setCmd(I请求应答);
								br.send(np1.toSendBytes());
							});
						break;
					default:// 非规范的指令时,断开连接
						// br.close();
						break;
					}
					// 这里不能追加代码.. 防止onConn 阻拦
				}
			}
		};
		nss.set_IDo接收处理函数(handle);
		nss.startServer();

	}

//	BlockMap<String, Netpg2Msgs1> bm2 = new BlockMap<>();
//
//	public Netpg2Msgs1 f请求(String appid, String 序列号, String... msgs) {
//		BInfo bInfo = _msmap.get(appid);
//		Netpg2Msgs1 np = _npg.clone();
//		np.set(4, null, msgs);
//		bInfo._reader.send(np.getSendBytes());
//		return bm2.get(appid + "," + 序列号);
//	}

	/**
	 * 注册处理 包含身份验证
	 */
	void onConn(String appid, String key, Netpg2Msgs1 netpg, NIOReader reader) {
		// 如果ip 不在白名单中 拒绝连接 br.close();未开发 ???? 后续增加
		BInfo bi = _msmap.get(appid);

		boolean insert = true;
		if (bi == null) {
			// 如果缺少info 创建一个
			bi = new BInfo();
			bi.setAppid(appid);
			_msmap.put(appid, bi);
		} else {
			insert = false;
		}
		// 如果该服务在运行,不能重复运行
		if (bi.state > 0) {
			if (bi._reader.isConnected()) {
				Netpg2Msgs1 np = _npg.clone();
				np.set(IERR, null, appid + "微服务运行中...");
				reader.send(np.toSendBytes());
				_mschl.remove(reader);
				reader.close();
				MyFun.print("b系统(", appid, ")重复接入,已拒绝....");
				return;
			}
		}
		_mschl.put(reader, bi);
		MyFun.print("b系统(", appid, ")接入成功.......");
		// 绑定 reader 和 msinfo,这样通过 appid 就可以取出 reader 进行定向发送
		bi._reader = reader;
		bi.setKeystr(key);
		// MyFun.print(appid,key);
		bi.ready();
		bi.b_ip = reader.get远程连接().toString();
		// 设置所有信息
		Map<String, String> info = BInfo.decodeToMap(netpg.getB参数());
		// MyFun.printJson(info);
		bi.pub信息 = info;
		if (insert) {
			bi.insertRow(getDB());
		} else {
			bi.updateRow(getDB());
		}
		Netpg2Msgs1 np = _npg.clone();
		// MyFun.print(bi.b_ip, _ma._binfo.aUrl);
		np.set(I注册, null, bi.b_ip, _ma._binfo.aUrl);
		reader.send(np.toSendBytes());
	}

	/**
	 * 异常处理
	 */
	void onErr(String APPID, String err) {

		MyFun.print("cmd=2", APPID, err);
		BInfo im = _msmap.get(APPID);
		im.err(err);

	}

	/**
	 * ping处理
	 */
	void onPing(BInfo im, byte[] keystr) {
		// MyFun.print(" cmd=1", im.getAppid());

		if (keystr != null) {
			String mm = im.aes解密(keystr);
			im.setKeystr(mm);
		}
		im.ping();
	}

//	void onTokenCreate(String APPID, IpgReader read) {
//		MyFun.print("cmd=3", APPID);
//		ExToken etoken = _ma.createExToken();
//		String token = MyRsa.f私钥加密uri(K私钥, etoken.getId());
//		Netpg2Msgs1 np = _npg.clone();
//		np.set(1, null, token);
//		read.send(np.getSendBytes());
//	}

	public static BInfo bytes2InfoMsm(byte[] 对象) {
		long ptm = MyFun.byteArray2Long(对象, 0);
		BInfo im = new BInfo();
		im.ptime = ptm;
		im.decode(MyByte.subBytes(对象, 8));
		return im;
	}

	public static byte[] infoMsm2bytes(BInfo 对象) {
		byte[] pt = MyFun.long2ByteArray(对象.ptime);
		return MyFun.concat(pt, 对象.encode());
	}

	public Map<String, BInfo> getMap() {
		return _msmap;
	}

	public BInfo getBInfo(String appid) {
		return _msmap.get(appid);
	}

	public boolean containApp(String appid) {
		return _msmap.containsKey(appid);
	}

	public boolean isBappOnline(String appid_b) {
		BInfo bi = _msmap.get(appid_b);
		if (bi == null)
			return false;
		if (bi.isConnected() && bi.getState() > BInfo.D停止)
			return true;
		return false;
	}

	public void resB2(IpgReader read, Netpg2Msgs1 npg) {
		Netpg2Msgs1 np = new Netpg2Msgs1();
		String cmd = npg.msgs[0];
		String token = null;
		if (cmd.equals(S验证令牌)) {
			token = npg.msgs[1];
			np.set(I内部应答, null, S验证令牌, token, (_etlist.contain_1(token)) ? "Y" : "N");
		} else if (cmd.equals(S获取令牌)) {
			ExToken et = createExToken();
			if (npg.msgs.length > 1)
				token = npg.msgs[1];
			np.set(I内部应答, null, S获取令牌, token, et.getId());
		}
		read.send(np.toSendBytes());
	}

	/**
	 * 规定 msgs[2] 是返回值
	 * 
	 * @param read
	 * @param npg
	 */
	public void resA3(IpgReader read, Netpg2Msgs1 npg) {
		BInfo bi = _mschl.get(read);
		String cmd = npg.msgs[0];
		String key = npg.msgs[1];
		String value = npg.msgs[2];
		_阻塞map.put(MyFun.join(bi.appid, cmd, key), value);
	}

	// 缺陷还不支持并发
	/**
	 * 至少2个参数 0:默认是cmd ;1:是key ;2:后可扩展
	 * 
	 * @param param
	 * @return
	 */
	public String req向B提问(String appid, String... param) {
		if (param == null) {
			return null;
		}
		Netpg2Msgs1 np = new Netpg2Msgs1();
		BInfo bi = _msmap.get(appid);
		if (bi == null || bi._reader == null)
			return null;
		np.set(I内部请求, null, param);
		String key = MyFun.join(appid, param[0], param[1]);
		_阻塞map.open(key);
		bi._reader.send(np.toSendBytes());
		return _阻塞map.get(key); // 等待赋值 后返回
	}

	public void sendMsg(String appid, byte[] bytes, String... msgs) {
		Netpg2Msgs1 np = new Netpg2Msgs1();
		BInfo bi = _msmap.get(appid);
		if (bi == null || bi._reader == null)
			return;
		np.set(I请求处理, bytes, msgs);
		bi._reader.send(np.toSendBytes());
	}

	public void request(String appid, byte[] bytes, String... msgs) {
		BInfo bi = _msmap.get(appid);
		if (bi == null || bi._reader == null)
			return;
		Netpg2Msgs1 np = generate反馈(bytes, msgs);
		bi._reader.send(np.toSendBytes());
	}

//////////////////////////

	public void f从A跳转到B(HttpServletResponse res, MySession sessiona, String appidb) {
		boolean online = isBappOnline(appidb);
		// 不在线 退出
		if (!online)
			return;

		// 如果 appidb 不在白名单 不允许访问
		String sessip = sessiona.getIp();
		// 登录a之后 session_a ->userid_a-> appid_b -> userid_b and sessionid_b
		// 如果没有 说明用户没有 在A,B都注册账号
		ABOpen opena = getABOpen1(sessiona.getUserId(), appidb);
		if (opena == null)
			return;

		String tempkey = MyFun.getMsDesc();
		String 明文 = MyFun.join2(",", appidb, opena.uidB, opena.sidB, tempkey, sessip);
		BInfo mi = getBInfo(appidb);
		String mm = mi.aes加密64(明文);
		String bUrl = _binfo.get("BURL");
		String url = MyFun.join(bUrl, "/a_b?key=", tempkey, "&mm=", mm);
		Response.direct(res, url);

		// 有sessionid_b 通过前端直接访问,也会因为ip 不一样 导致无法登录
		// 如果sessionid_b 为空或不正确 让b创建一个新的 sessionid_b
		// f4_2_b(mm, tempkey);
	}

/////////////////////////////tmsa dao///////////////////////////

	public ABOpen getABOpen1(Integer userida, String appidb) {
		String sql = "select * from app_user where uidA=? and appB=?";
		Object[] params = { userida, appidb };
		Map<String, Object> mp = getDB().queryMap(sql, params);
		if (mp == null || mp.isEmpty())
			return null;
		return new ABOpen(mp);
	}

	public ABOpen getABOpen2(String appidb, Integer useridb) {
		String sql = "select * from app_user where uidB=? and appB=?";
		Object[] params = { useridb, appidb };
		Map<String, Object> mp = getDB().queryMap(sql, params);
		//MyFun.printJson(mp);
		if (mp == null || mp.isEmpty())
			return null;
		return new ABOpen(mp);
	}

	public void insertABOpen(ABOpen 记录) {
		String sql = " insert into app_user (openid,appB,uidA,unamea,uidB,unameb) values (?,?,?,?,?,?)";
		Object[] params = { 记录.openid, 记录.appB, 记录.uidA, 记录.unameA, 记录.uidB, 记录.unameB };
		getDB().executeUpdate(sql, params);
	}

	public void updateABOpen(ABOpen row) {
		String sql = " update app_user set appB=?,uidA=?,unameA=?,uidB=?,unameB=?,"
				+ "sidA=?,sidB=?,ipB=?,ok=? where openid=?";
		Object[] params = { row.appB, row.uidA, row.unameA, row.uidB, row.unameB, 
				row.sidA, row.sidB, row.ipB, row.ok,row.openid };
		getDB().executeUpdate(sql, params);
	}

	//////////////////// 请求B////////////////////////

	// 请求b的用户会话
	public String queryB用户令牌(String burl) {
		String url = MyFun.join(burl, "/getbtoken");
		HttpURLConnection conn = MyHttp.sendGet(url);
		return MyHttp.getResult(conn);
	}

	//////////////////////////////////////////

	// 载入 app 信息
	public void loadAppInfos() {
		_msmap = new ConcurrentHashMap<>();
		_mschl = new ConcurrentHashMap<>();

		// t0 是客户端上传的app名称, 客户端只会更新t0 不会更新 t15
		// t15 是服务的 重新命名的app 别名
		String sql = "select appid,l0,t0,t1,t2, t3,t4,t5,t6,t7,  t8,t9,t10,t11,t12,  t13,t14,i0 from app";
		List<Object[]> mmp = getDB().queryListD(sql, null);
		for (Object[] cols : mmp) {
			BInfo bInfo = new BInfo();
			bInfo.loadObjects(cols);
			// MyFun.print(JSON.toJSONString(bInfo));
			_msmap.put(bInfo.getAppid(), bInfo);
		}
		// 每隔5分钟检查一下 清除不在线的binfo 项
		MyTimer.getInstance().start(() -> {
			List<NIOReader> rds = new ArrayList<>();
			_mschl.forEach((rd, bi) -> {
				if (!rd.isConnected())
					rds.add(rd);
			});
			rds.forEach(rd -> {
				_mschl.remove(rd);
			});
		}, 300000);

	}
}
